Java Concurrent - AQS & FutureTask

Overview

基于Java1.6, 前后的版本实现都有所不同.

确定入口

在通常情况下, 我们是配合线程池获取一个Future的:

1
2
3
4
Callable<Integer> callable = () -> {
return 1;
};
Future<Integer> future = Executors.newSingleThreadExecutor().submit(callable);

观察submit方法内部, 不考虑”池”的概念实际上等同于下面操作:

1
2
RunnableFuture<Integer> runnableFuture = new FutureTask<>(callable);
new Thread(runnableFuture).start();

RunnableFuture本身实现了RunnableFuture. 从而可以确定FutureTask的两个重要入口方法: run, get.

Sync

观察FutureTask内部, 同样有一个Sync. 和CountDownLatch相似, FutureTask本身也是个共享的同步工具. 由于一个线程被unpark后, 会继续传播PROPAGATE, 所以当FutureTask中的任务执行完成后, 所有调用get方法的线程都会退出阻塞. CountDownLatchstate表示数量, 而在FutureTask则表示生命周期的状态:

  • READY = 0 刚刚new FutureTask后的状态.
  • RUNNING = 1
  • RAN = 2 表示执行完毕
  • CANCELED = 4

如果强行类比的话, 那么get就对应CountDownLatchawait. run/cancel则对应countDown.

get

调用关系如下:

FutureTask#get -> Sync#innerGet -> AQS#acquireSharedInterruptibly(0)

根据我们之前的了解, 如果没有tryAcquireShared(0)成功, 则当前线程会被挂起. 在此处, tryAcquireShared的参数是没有任何意义的, 所以形参被取名叫ignore.

1
2
3
protected int tryAcquireShared(int ignore) {
return innerIsDone() ? 1 : -1; // 大于等于0 表示acquire成功
}

innerIsDone大致为判断了状态是否为RAN或者CANCELED, 也就是说, 当其他线程将state设置为这两种状态之后, 等待线程才能结束运行. 当等待线程被唤醒后, 会做两个判断, 代码非常简单:

1
2
3
4
5
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;

run

FutureTask#run 中仅为调用 Sync#innerRun. 其中执行了callable#call, 没有异常则set, 反之则setException. 分别对应Sync#innerSet, Sync#innerSetException.

观察两个方法, 其中的重要内容就是把stateCAS为RAN, 并doReleaseShared(0). 在CountDownLatch中有提到过这个方法, 子类的重点在于tryReleaseShared是如何实现的, 一定是返回true的, 且内容非常简单粗暴:

1
2
3
4
protected boolean tryReleaseShared(int ignore) {
runner = null;
return true;
}

可以看到, 这个方法的参数同样是无意义的. 其中的runner指的是执行FutureTask的线程, 即调用set,setException的线程. 实际上刚刚的innerIsDone除了判断状态之外, 还要求runner == null, 对应地, 他在tryReleaseShared这里被置为null.

cancel

大体和set, setException一致, 都是将stateCAS为一个值, 此处为CANCELED. 并且执行doReleaseShared. 除次之外, 如果为cancel(true)还会根据参数对执行任务的线程进行interrupt. 但是我们知道interrupt不一定成功, 观察下面的栗子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
Callable<Integer> callable = () -> {
spinFor(4, TimeUnit.SECONDS);
System.out.println("Done");
return 1;
};

RunnableFuture<Integer> runnableFuture = new FutureTask<>(callable);
new Thread(runnableFuture).start();

Thread.sleep(1000); // ensure the other thread has started
runnableFuture.cancel(true);
System.out.println(runnableFuture.get(3, TimeUnit.SECONDS));
}

private static void spinFor(long duration, TimeUnit timeUnit) {
long nanos = TimeUnit.NANOSECONDS.convert(duration, timeUnit);
long lastTime = System.nanoTime();

while (System.nanoTime() - lastTime < nanos) {
// nothing
}
}

可以发现, cancel之后, get线程马上就得到响应, 抛出异常了. 而执行线程仍然要在等待几秒后执行完.